/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.io.IteratorInputFormat;
import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
import org.apache.flink.api.java.io.PrimitiveInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.io.TextValueInputFormat;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.utils.PlanGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutor;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.NumberSequenceIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.WrappingRuntimeException;

import com.esotericsoftware.kryo.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * The ExecutionEnvironment is the context in which a program is executed. A {@link
 * LocalEnvironment} will cause execution in the current JVM, a {@link RemoteEnvironment} will cause
 * execution on a remote setup.
 *
 * <p>The environment provides methods to control the job execution (such as setting the
 * parallelism) and to interact with the outside world (data access).
 *
 * <p>Please note that the execution environment needs strong type information for the input and
 * return types of all operations that are executed. This means that the environments needs to know
 * that the return value of an operation is for example a Tuple of String and Integer. Because the
 * Java compiler throws much of the generic type information away, most methods attempt to re-
 * obtain that information using reflection. In certain cases, it may be necessary to manually
 * supply that information to some of the methods.
 *
 * @see LocalEnvironment
 * @see RemoteEnvironment
 */
@Public
public class ExecutionEnvironment {

    /** The logger used by the environment and its subclasses. */
    protected static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

    /**
     * The environment of the context (local by default, cluster if invoked through command line).
     */
    private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

    /** The ThreadLocal used to store {@link ExecutionEnvironmentFactory}. */
    private static final ThreadLocal<ExecutionEnvironmentFactory>
            threadLocalContextEnvironmentFactory = new ThreadLocal<>();

    /** The default parallelism used by local environments. */
    private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();

    // --------------------------------------------------------------------------------------------

    private final List<DataSink<?>> sinks = new ArrayList<>();

    private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<>();

    private final ExecutionConfig config = new ExecutionConfig();

    /**
     * Result from the latest execution, to make it retrievable when using eager execution methods.
     */
    protected JobExecutionResult lastJobExecutionResult;

    /** Flag to indicate whether sinks have been cleared in previous executions. */
    private boolean wasExecuted = false;

    private final PipelineExecutorServiceLoader executorServiceLoader;

    private final Configuration configuration;

    private final ClassLoader userClassloader;

    private final List<JobListener> jobListeners = new ArrayList<>();

    /**
     * Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
     * configure the {@link PipelineExecutor}.
     */
    @PublicEvolving
    public ExecutionEnvironment(final Configuration configuration) {
        this(configuration, null);
    }

    /**
     * Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
     * configure the {@link PipelineExecutor}.
     *
     * <p>In addition, this constructor allows specifying the user code {@link ClassLoader}.
     */
    @PublicEvolving
    public ExecutionEnvironment(
            final Configuration configuration, final ClassLoader userClassloader) {
        this(new DefaultExecutorServiceLoader(), configuration, userClassloader);
    }

    /**
     * Creates a new {@link ExecutionEnvironment} that will use the given {@link Configuration} to
     * configure the {@link PipelineExecutor}.
     *
     * <p>In addition, this constructor allows specifying the {@link PipelineExecutorServiceLoader}
     * and user code {@link ClassLoader}.
     */
    @PublicEvolving
    public ExecutionEnvironment(
            final PipelineExecutorServiceLoader executorServiceLoader,
            final Configuration configuration,
            final ClassLoader userClassloader) {
        this.executorServiceLoader = checkNotNull(executorServiceLoader);
        this.configuration = new Configuration(checkNotNull(configuration));
        this.userClassloader =
                userClassloader == null ? getClass().getClassLoader() : userClassloader;

        // the configuration of a job or an operator can be specified at the following places:
        //     i) at the operator level using e.g. parallelism using the
        // SingleOutputStreamOperator.setParallelism().
        //     ii) programmatically by using e.g. the env.setRestartStrategy() method
        //     iii) in the configuration passed here
        //
        // if specified in multiple places, the priority order is the above.
        //
        // Given this, it is safe to overwrite the execution config default values here because all
        // other ways assume
        // that the env is already instantiated so they will overwrite the value passed here.
        this.configure(this.configuration, this.userClassloader);
    }

    /** Creates a new Execution Environment. */
    protected ExecutionEnvironment() {
        this(new Configuration());
    }

    @Internal
    public ClassLoader getUserCodeClassLoader() {
        return userClassloader;
    }

    @Internal
    public PipelineExecutorServiceLoader getExecutorServiceLoader() {
        return executorServiceLoader;
    }

    @Internal
    public Configuration getConfiguration() {
        return this.configuration;
    }

    // --------------------------------------------------------------------------------------------
    //  Properties
    // --------------------------------------------------------------------------------------------

    /**
     * Gets the config object that defines execution parameters.
     *
     * @return The environment's execution configuration.
     */
    public ExecutionConfig getConfig() {
        return config;
    }

    /** Gets the config JobListeners. */
    protected List<JobListener> getJobListeners() {
        return jobListeners;
    }

    /**
     * Gets the parallelism with which operation are executed by default. Operations can
     * individually override this value to use a specific parallelism via {@link
     * Operator#setParallelism(int)}. Other operations may need to run with a different parallelism
     * - for example calling {@link
     * DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)} over the entire set
     * will insert eventually an operation that runs non-parallel (parallelism of one).
     *
     * @return The parallelism used by operations, unless they override that value. This method
     *     returns {@link ExecutionConfig#PARALLELISM_DEFAULT}, if the environment's default
     *     parallelism should be used.
     */
    public int getParallelism() {
        return config.getParallelism();
    }

    /**
     * Sets the parallelism for operations executed through this environment. Setting a parallelism
     * of x here will cause all operators (such as join, map, reduce) to run with x parallel
     * instances.
     *
     * <p>This method overrides the default parallelism for this environment. The {@link
     * LocalEnvironment} uses by default a value equal to the number of hardware contexts (CPU cores
     * / threads). When executing the program via the command line client from a JAR file, the
     * default parallelism is the one configured for that setup.
     *
     * @param parallelism The parallelism
     */
    public void setParallelism(int parallelism) {
        config.setParallelism(parallelism);
    }

    /**
     * Sets the restart strategy configuration. The configuration specifies which restart strategy
     * will be used for the execution graph in case of a restart.
     *
     * @param restartStrategyConfiguration Restart strategy configuration to be set
     */
    @PublicEvolving
    public void setRestartStrategy(
            RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
        config.setRestartStrategy(restartStrategyConfiguration);
    }

    /**
     * Returns the specified restart strategy configuration.
     *
     * @return The restart strategy configuration to be used
     */
    @PublicEvolving
    public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
        return config.getRestartStrategy();
    }

    /**
     * Sets the number of times that failed tasks are re-executed. A value of zero effectively
     * disables fault tolerance. A value of {@code -1} indicates that the system default value (as
     * defined in the configuration) should be used.
     *
     * @param numberOfExecutionRetries The number of times the system will try to re-execute failed
     *     tasks.
     * @deprecated This method will be replaced by {@link #setRestartStrategy}. The {@link
     *     RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
     *     execution retries.
     */
    @Deprecated
    @PublicEvolving
    public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
        config.setNumberOfExecutionRetries(numberOfExecutionRetries);
    }

    /**
     * Gets the number of times the system will try to re-execute failed tasks. A value of {@code
     * -1} indicates that the system default value (as defined in the configuration) should be used.
     *
     * @return The number of times the system will try to re-execute failed tasks.
     * @deprecated This method will be replaced by {@link #getRestartStrategy}. The {@link
     *     RestartStrategies.FixedDelayRestartStrategyConfiguration} contains the number of
     *     execution retries.
     */
    @Deprecated
    @PublicEvolving
    public int getNumberOfExecutionRetries() {
        return config.getNumberOfExecutionRetries();
    }

    /**
     * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job.
     *
     * @return The execution result from the latest job execution.
     */
    public JobExecutionResult getLastJobExecutionResult() {
        return this.lastJobExecutionResult;
    }

    // --------------------------------------------------------------------------------------------
    //  Registry for types and serializers
    // --------------------------------------------------------------------------------------------

    /**
     * Adds a new Kryo default serializer to the Runtime.
     *
     * <p>Note that the serializer instance must be serializable (as defined by
     * java.io.Serializable), because it may be distributed to the worker nodes by java
     * serialization.
     *
     * @param type The class of the types serialized with the given serializer.
     * @param serializer The serializer to use.
     */
    public <T extends Serializer<?> & Serializable> void addDefaultKryoSerializer(
            Class<?> type, T serializer) {
        config.addDefaultKryoSerializer(type, serializer);
    }

    /**
     * Adds a new Kryo default serializer to the Runtime.
     *
     * @param type The class of the types serialized with the given serializer.
     * @param serializerClass The class of the serializer to use.
     */
    public void addDefaultKryoSerializer(
            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
        config.addDefaultKryoSerializer(type, serializerClass);
    }

    /**
     * Registers the given type with a Kryo Serializer.
     *
     * <p>Note that the serializer instance must be serializable (as defined by
     * java.io.Serializable), because it may be distributed to the worker nodes by java
     * serialization.
     *
     * @param type The class of the types serialized with the given serializer.
     * @param serializer The serializer to use.
     */
    public <T extends Serializer<?> & Serializable> void registerTypeWithKryoSerializer(
            Class<?> type, T serializer) {
        config.registerTypeWithKryoSerializer(type, serializer);
    }

    /**
     * Registers the given Serializer via its class as a serializer for the given type at the
     * KryoSerializer.
     *
     * @param type The class of the types serialized with the given serializer.
     * @param serializerClass The class of the serializer to use.
     */
    public void registerTypeWithKryoSerializer(
            Class<?> type, Class<? extends Serializer<?>> serializerClass) {
        config.registerTypeWithKryoSerializer(type, serializerClass);
    }

    /**
     * Registers the given type with the serialization stack. If the type is eventually serialized
     * as a POJO, then the type is registered with the POJO serializer. If the type ends up being
     * serialized with Kryo, then it will be registered at Kryo to make sure that only tags are
     * written.
     *
     * @param type The class of the type to register.
     */
    public void registerType(Class<?> type) {
        if (type == null) {
            throw new NullPointerException("Cannot register null type class.");
        }

        TypeInformation<?> typeInfo = TypeExtractor.createTypeInfo(type);

        if (typeInfo instanceof PojoTypeInfo) {
            config.registerPojoType(type);
        } else {
            config.registerKryoType(type);
        }
    }

    /**
     * Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link
     * PipelineOptions#CACHED_FILES}. It will reconfigure {@link ExecutionEnvironment} and {@link
     * ExecutionConfig}.
     *
     * <p>It will change the value of a setting only if a corresponding option was set in the {@code
     * configuration}. If a key is not present, the current value of a field will remain untouched.
     *
     * @param configuration a configuration to read the values from
     * @param classLoader a class loader to use when loading classes
     */
    @PublicEvolving
    public void configure(ReadableConfig configuration, ClassLoader classLoader) {
        configuration
                .getOptional(DeploymentOptions.JOB_LISTENERS)
                .ifPresent(listeners -> registerCustomListeners(classLoader, listeners));
        configuration
                .getOptional(PipelineOptions.CACHED_FILES)
                .ifPresent(
                        f -> {
                            this.cacheFile.clear();
                            this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
                        });
        configuration
                .getOptional(PipelineOptions.NAME)
                .ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName));
        config.configure(configuration, classLoader);
    }

    private void registerCustomListeners(
            final ClassLoader classLoader, final List<String> listeners) {
        for (String listener : listeners) {
            try {
                final JobListener jobListener =
                        InstantiationUtil.instantiate(listener, JobListener.class, classLoader);
                jobListeners.add(jobListener);
            } catch (FlinkException e) {
                throw new WrappingRuntimeException("Could not load JobListener : " + listener, e);
            }
        }
    }

    // --------------------------------------------------------------------------------------------
    //  Data set creations
    // --------------------------------------------------------------------------------------------

    // ---------------------------------- Text Input Format ---------------------------------------

    /**
     * Creates a {@link DataSet} that represents the Strings produced by reading the given file line
     * wise. The file will be read with the UTF-8 character set.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @return A {@link DataSet} that represents the data read from the given file as text lines.
     */
    public DataSource<String> readTextFile(String filePath) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        return new DataSource<>(
                this,
                new TextInputFormat(new Path(filePath)),
                BasicTypeInfo.STRING_TYPE_INFO,
                Utils.getCallLocationName());
    }

    /**
     * Creates a {@link DataSet} that represents the Strings produced by reading the given file line
     * wise. The {@link java.nio.charset.Charset} with the given name will be used to read the
     * files.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @param charsetName The name of the character set used to read the file.
     * @return A {@link DataSet} that represents the data read from the given file as text lines.
     */
    public DataSource<String> readTextFile(String filePath, String charsetName) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        TextInputFormat format = new TextInputFormat(new Path(filePath));
        format.setCharsetName(charsetName);
        return new DataSource<>(
                this, format, BasicTypeInfo.STRING_TYPE_INFO, Utils.getCallLocationName());
    }

    // -------------------------- Text Input Format With String Value------------------------------

    /**
     * Creates a {@link DataSet} that represents the Strings produced by reading the given file line
     * wise. This method is similar to {@link #readTextFile(String)}, but it produces a DataSet with
     * mutable {@link StringValue} objects, rather than Java Strings. StringValues can be used to
     * tune implementations to be less object and garbage collection heavy.
     *
     * <p>The file will be read with the UTF-8 character set.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @return A {@link DataSet} that represents the data read from the given file as text lines.
     */
    public DataSource<StringValue> readTextFileWithValue(String filePath) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        return new DataSource<>(
                this,
                new TextValueInputFormat(new Path(filePath)),
                new ValueTypeInfo<>(StringValue.class),
                Utils.getCallLocationName());
    }

    /**
     * Creates a {@link DataSet} that represents the Strings produced by reading the given file line
     * wise. This method is similar to {@link #readTextFile(String, String)}, but it produces a
     * DataSet with mutable {@link StringValue} objects, rather than Java Strings. StringValues can
     * be used to tune implementations to be less object and garbage collection heavy.
     *
     * <p>The {@link java.nio.charset.Charset} with the given name will be used to read the files.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @param charsetName The name of the character set used to read the file.
     * @param skipInvalidLines A flag to indicate whether to skip lines that cannot be read with the
     *     given character set.
     * @return A DataSet that represents the data read from the given file as text lines.
     */
    public DataSource<StringValue> readTextFileWithValue(
            String filePath, String charsetName, boolean skipInvalidLines) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        TextValueInputFormat format = new TextValueInputFormat(new Path(filePath));
        format.setCharsetName(charsetName);
        format.setSkipInvalidLines(skipInvalidLines);
        return new DataSource<>(
                this, format, new ValueTypeInfo<>(StringValue.class), Utils.getCallLocationName());
    }

    // ----------------------------------- Primitive Input Format
    // ---------------------------------------

    /**
     * Creates a {@link DataSet} that represents the primitive type produced by reading the given
     * file line wise. This method is similar to {@link #readCsvFile(String)} with single field, but
     * it produces a DataSet not through {@link org.apache.flink.api.java.tuple.Tuple1}.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @param typeClass The primitive type class to be read.
     * @return A {@link DataSet} that represents the data read from the given file as primitive
     *     type.
     */
    public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        return new DataSource<>(
                this,
                new PrimitiveInputFormat<>(new Path(filePath), typeClass),
                TypeExtractor.getForClass(typeClass),
                Utils.getCallLocationName());
    }

    /**
     * Creates a {@link DataSet} that represents the primitive type produced by reading the given
     * file in delimited way. This method is similar to {@link #readCsvFile(String)} with single
     * field, but it produces a DataSet not through {@link org.apache.flink.api.java.tuple.Tuple1}.
     *
     * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
     *     "hdfs://host:port/file/path").
     * @param delimiter The delimiter of the given file.
     * @param typeClass The primitive type class to be read.
     * @return A {@link DataSet} that represents the data read from the given file as primitive
     *     type.
     */
    public <X> DataSource<X> readFileOfPrimitives(
            String filePath, String delimiter, Class<X> typeClass) {
        Preconditions.checkNotNull(filePath, "The file path may not be null.");

        return new DataSource<>(
                this,
                new PrimitiveInputFormat<>(new Path(filePath), delimiter, typeClass),
                TypeExtractor.getForClass(typeClass),
                Utils.getCallLocationName());
    }

    // ----------------------------------- CSV Input Format ---------------------------------------

    /**
     * Creates a CSV reader to read a comma separated value (CSV) file. The reader has options to
     * define parameters and field types and will eventually produce the DataSet that corresponds to
     * the read and parsed CSV input.
     *
     * @param filePath The path of the CSV file.
     * @return A CsvReader that can be used to configure the CSV input.
     */
    public CsvReader readCsvFile(String filePath) {
        return new CsvReader(filePath, this);
    }

    // ------------------------------------ File Input Format
    // -----------------------------------------

    public <X> DataSource<X> readFile(FileInputFormat<X> inputFormat, String filePath) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }
        if (filePath == null) {
            throw new IllegalArgumentException("The file path must not be null.");
        }

        inputFormat.setFilePath(new Path(filePath));
        try {
            return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException(
                    "The type returned by the input format could not be automatically determined. "
                            + "Please specify the TypeInformation of the produced type explicitly by using the "
                            + "'createInput(InputFormat, TypeInformation)' method instead.");
        }
    }

    // ----------------------------------- Generic Input Format
    // ---------------------------------------

    /**
     * Generic method to create an input {@link DataSet} with in {@link InputFormat}. The DataSet
     * will not be immediately created - instead, this method returns a DataSet that will be lazily
     * created from the input format once the program is executed.
     *
     * <p>Since all data sets need specific information about their types, this method needs to
     * determine the type of the data produced by the input format. It will attempt to determine the
     * data type by reflection, unless the input format implements the {@link ResultTypeQueryable}
     * interface. In the latter case, this method will invoke the {@link
     * ResultTypeQueryable#getProducedType()} method to determine data type produced by the input
     * format.
     *
     * @param inputFormat The input format used to create the data set.
     * @return A {@link DataSet} that represents the data created by the input format.
     * @see #createInput(InputFormat, TypeInformation)
     */
    public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }

        try {
            return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException(
                    "The type returned by the input format could not be automatically determined. "
                            + "Please specify the TypeInformation of the produced type explicitly by using the "
                            + "'createInput(InputFormat, TypeInformation)' method instead.",
                    e);
        }
    }

    /**
     * Generic method to create an input DataSet with in {@link InputFormat}. The {@link DataSet}
     * will not be immediately created - instead, this method returns a {@link DataSet} that will be
     * lazily created from the input format once the program is executed.
     *
     * <p>The {@link DataSet} is typed to the given TypeInformation. This method is intended for
     * input formats that where the return type cannot be determined by reflection analysis, and
     * that do not implement the {@link ResultTypeQueryable} interface.
     *
     * @param inputFormat The input format used to create the data set.
     * @return A {@link DataSet} that represents the data created by the input format.
     * @see #createInput(InputFormat)
     */
    public <X> DataSource<X> createInput(
            InputFormat<X, ?> inputFormat, TypeInformation<X> producedType) {
        if (inputFormat == null) {
            throw new IllegalArgumentException("InputFormat must not be null.");
        }

        if (producedType == null) {
            throw new IllegalArgumentException("Produced type information must not be null.");
        }

        return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
    }

    // ----------------------------------- Collection ---------------------------------------

    /**
     * Creates a DataSet from the given non-empty collection. The type of the data set is that of
     * the elements in the collection.
     *
     * <p>The framework will try and determine the exact type from the collection elements. In case
     * of generic elements, it may be necessary to manually supply the type information via {@link
     * #fromCollection(Collection, TypeInformation)}.
     *
     * <p>Note that this operation will result in a non-parallel data source, i.e. a data source
     * with a parallelism of one.
     *
     * @param data The collection of elements to create the data set from.
     * @return A DataSet representing the given collection.
     * @see #fromCollection(Collection, TypeInformation)
     */
    public <X> DataSource<X> fromCollection(Collection<X> data) {
        if (data == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (data.size() == 0) {
            throw new IllegalArgumentException("The size of the collection must not be empty.");
        }

        X firstValue = data.iterator().next();

        TypeInformation<X> type = TypeExtractor.getForObject(firstValue);
        CollectionInputFormat.checkCollection(data, type.getTypeClass());
        return new DataSource<>(
                this,
                new CollectionInputFormat<>(data, type.createSerializer(config)),
                type,
                Utils.getCallLocationName());
    }

    /**
     * Creates a DataSet from the given non-empty collection. Note that this operation will result
     * in a non-parallel data source, i.e. a data source with a parallelism of one.
     *
     * <p>The returned DataSet is typed to the given TypeInformation.
     *
     * @param data The collection of elements to create the data set from.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the given collection.
     * @see #fromCollection(Collection)
     */
    public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {
        return fromCollection(data, type, Utils.getCallLocationName());
    }

    private <X> DataSource<X> fromCollection(
            Collection<X> data, TypeInformation<X> type, String callLocationName) {
        CollectionInputFormat.checkCollection(data, type.getTypeClass());
        return new DataSource<>(
                this,
                new CollectionInputFormat<>(data, type.createSerializer(config)),
                type,
                callLocationName);
    }

    /**
     * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
     * the actual execution happens, the type of data returned by the iterator must be given
     * explicitly in the form of the type class (this is due to the fact that the Java compiler
     * erases the generic type information).
     *
     * <p>Note that this operation will result in a non-parallel data source, i.e. a data source
     * with a parallelism of one.
     *
     * @param data The collection of elements to create the data set from.
     * @param type The class of the data produced by the iterator. Must not be a generic class.
     * @return A DataSet representing the elements in the iterator.
     * @see #fromCollection(Iterator, TypeInformation)
     */
    public <X> DataSource<X> fromCollection(Iterator<X> data, Class<X> type) {
        return fromCollection(data, TypeExtractor.getForClass(type));
    }

    /**
     * Creates a DataSet from the given iterator. Because the iterator will remain unmodified until
     * the actual execution happens, the type of data returned by the iterator must be given
     * explicitly in the form of the type information. This method is useful for cases where the
     * type is generic. In that case, the type class (as given in {@link #fromCollection(Iterator,
     * Class)} does not supply all type information.
     *
     * <p>Note that this operation will result in a non-parallel data source, i.e. a data source
     * with a parallelism of one.
     *
     * @param data The collection of elements to create the data set from.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the elements in the iterator.
     * @see #fromCollection(Iterator, Class)
     */
    public <X> DataSource<X> fromCollection(Iterator<X> data, TypeInformation<X> type) {
        return new DataSource<>(
                this, new IteratorInputFormat<>(data), type, Utils.getCallLocationName());
    }

    /**
     * Creates a new data set that contains the given elements. The elements must all be of the same
     * type, for example, all of the {@link String} or {@link Integer}. The sequence of elements
     * must not be empty.
     *
     * <p>The framework will try and determine the exact type from the collection elements. In case
     * of generic elements, it may be necessary to manually supply the type information via {@link
     * #fromCollection(Collection, TypeInformation)}.
     *
     * <p>Note that this operation will result in a non-parallel data source, i.e. a data source
     * with a parallelism of one.
     *
     * @param data The elements to make up the data set.
     * @return A DataSet representing the given list of elements.
     */
    @SafeVarargs
    public final <X> DataSource<X> fromElements(X... data) {
        if (data == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (data.length == 0) {
            throw new IllegalArgumentException("The number of elements must not be zero.");
        }

        TypeInformation<X> typeInfo;
        try {
            typeInfo = TypeExtractor.getForObject(data[0]);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not create TypeInformation for type "
                            + data[0].getClass().getName()
                            + "; please specify the TypeInformation manually via "
                            + "ExecutionEnvironment#fromElements(Collection, TypeInformation)",
                    e);
        }

        return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
    }

    /**
     * Creates a new data set that contains the given elements. The framework will determine the
     * type according to the based type user supplied. The elements should be the same or be the
     * subclass to the based type. The sequence of elements must not be empty. Note that this
     * operation will result in a non-parallel data source, i.e. a data source with a parallelism of
     * one.
     *
     * @param type The base class type for every element in the collection.
     * @param data The elements to make up the data set.
     * @return A DataSet representing the given list of elements.
     */
    @SafeVarargs
    public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
        if (data == null) {
            throw new IllegalArgumentException("The data must not be null.");
        }
        if (data.length == 0) {
            throw new IllegalArgumentException("The number of elements must not be zero.");
        }

        TypeInformation<X> typeInfo;
        try {
            typeInfo = TypeExtractor.getForClass(type);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not create TypeInformation for type "
                            + type.getName()
                            + "; please specify the TypeInformation manually via "
                            + "ExecutionEnvironment#fromElements(Collection, TypeInformation)",
                    e);
        }

        return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
    }

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable,
     * allowing the framework to create a parallel data source that returns the elements in the
     * iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type
     * of data returned by the iterator must be given explicitly in the form of the type class (this
     * is due to the fact that the Java compiler erases the generic type information).
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The class of the data produced by the iterator. Must not be a generic class.
     * @return A DataSet representing the elements in the iterator.
     * @see #fromParallelCollection(SplittableIterator, TypeInformation)
     */
    public <X> DataSource<X> fromParallelCollection(SplittableIterator<X> iterator, Class<X> type) {
        return fromParallelCollection(iterator, TypeExtractor.getForClass(type));
    }

    /**
     * Creates a new data set that contains elements in the iterator. The iterator is splittable,
     * allowing the framework to create a parallel data source that returns the elements in the
     * iterator.
     *
     * <p>Because the iterator will remain unmodified until the actual execution happens, the type
     * of data returned by the iterator must be given explicitly in the form of the type
     * information. This method is useful for cases where the type is generic. In that case, the
     * type class (as given in {@link #fromParallelCollection(SplittableIterator, Class)} does not
     * supply all type information.
     *
     * @param iterator The iterator that produces the elements of the data set.
     * @param type The TypeInformation for the produced data set.
     * @return A DataSet representing the elements in the iterator.
     * @see #fromParallelCollection(SplittableIterator, Class)
     */
    public <X> DataSource<X> fromParallelCollection(
            SplittableIterator<X> iterator, TypeInformation<X> type) {
        return fromParallelCollection(iterator, type, Utils.getCallLocationName());
    }

    // private helper for passing different call location names
    private <X> DataSource<X> fromParallelCollection(
            SplittableIterator<X> iterator, TypeInformation<X> type, String callLocationName) {
        return new DataSource<>(
                this, new ParallelIteratorInputFormat<>(iterator), type, callLocationName);
    }

    /**
     * Creates a new data set that contains a sequence of numbers. The data set will be created in
     * parallel, so there is no guarantee about the order of the elements.
     *
     * @param from The number to start at (inclusive).
     * @param to The number to stop at (inclusive).
     * @return A DataSet, containing all number in the {@code [from, to]} interval.
     */
    public DataSource<Long> generateSequence(long from, long to) {
        return fromParallelCollection(
                new NumberSequenceIterator(from, to),
                BasicTypeInfo.LONG_TYPE_INFO,
                Utils.getCallLocationName());
    }

    // --------------------------------------------------------------------------------------------
    //  Executing
    // --------------------------------------------------------------------------------------------

    /**
     * Triggers the program execution. The environment will execute all parts of the program that
     * have resulted in a "sink" operation. Sink operations are for example printing results ({@link
     * DataSet#print()}, writing results (e.g. {@link DataSet#writeAsText(String)}, {@link
     * DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with a generated default name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public JobExecutionResult execute() throws Exception {
        return execute(getJobName());
    }

    /**
     * Triggers the program execution. The environment will execute all parts of the program that
     * have resulted in a "sink" operation. Sink operations are for example printing results ({@link
     * DataSet#print()}, writing results (e.g. {@link DataSet#writeAsText(String)}, {@link
     * DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
     * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with the given job name.
     *
     * @return The result of the job execution, containing elapsed time and accumulators.
     * @throws Exception Thrown, if the program executions fails.
     */
    public JobExecutionResult execute(String jobName) throws Exception {
        final JobClient jobClient = executeAsync(jobName);

        try {
            if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
                lastJobExecutionResult = jobClient.getJobExecutionResult().get();
            } else {
                lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
            }

            jobListeners.forEach(
                    jobListener -> jobListener.onJobExecuted(lastJobExecutionResult, null));

        } catch (Throwable t) {
            // get() on the JobExecutionResult Future will throw an ExecutionException. This
            // behaviour was largely not there in Flink versions before the PipelineExecutor
            // refactoring so we should strip that exception.
            Throwable strippedException = ExceptionUtils.stripExecutionException(t);

            jobListeners.forEach(
                    jobListener -> {
                        jobListener.onJobExecuted(null, strippedException);
                    });
            ExceptionUtils.rethrowException(strippedException);
        }

        return lastJobExecutionResult;
    }

    /**
     * Register a {@link JobListener} in this environment. The {@link JobListener} will be notified
     * on specific job status changed.
     */
    @PublicEvolving
    public void registerJobListener(JobListener jobListener) {
        checkNotNull(jobListener, "JobListener cannot be null");
        jobListeners.add(jobListener);
    }

    /** Clear all registered {@link JobListener}s. */
    @PublicEvolving
    public void clearJobListeners() {
        this.jobListeners.clear();
    }

    /**
     * Triggers the program execution asynchronously. The environment will execute all parts of the
     * program that have resulted in a "sink" operation. Sink operations are for example printing
     * results ({@link DataSet#print()}, writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other
     * generic data sinks created with {@link
     * DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with a generated default name.
     *
     * @return A {@link JobClient} that can be used to communicate with the submitted job, completed
     *     on submission succeeded.
     * @throws Exception Thrown, if the program submission fails.
     */
    @PublicEvolving
    public final JobClient executeAsync() throws Exception {
        return executeAsync(getJobName());
    }

    /**
     * Triggers the program execution asynchronously. The environment will execute all parts of the
     * program that have resulted in a "sink" operation. Sink operations are for example printing
     * results ({@link DataSet#print()}, writing results (e.g. {@link DataSet#writeAsText(String)},
     * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other
     * generic data sinks created with {@link
     * DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
     *
     * <p>The program execution will be logged and displayed with the given job name.
     *
     * @return A {@link JobClient} that can be used to communicate with the submitted job, completed
     *     on submission succeeded.
     * @throws Exception Thrown, if the program submission fails.
     */
    @PublicEvolving
    public JobClient executeAsync(String jobName) throws Exception {
        checkNotNull(
                configuration.get(DeploymentOptions.TARGET),
                "No execution.target specified in your configuration file.");

        final Plan plan = createProgramPlan(jobName);
        final PipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);

        checkNotNull(
                executorFactory,
                "Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));

        CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)
                        .execute(plan, configuration, userClassloader);

        try {
            JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
            return jobClient;
        } catch (Throwable t) {
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
            ExceptionUtils.rethrow(t);

            // make javac happy, this code path will not be reached
            return null;
        }
    }

    /**
     * Creates the plan with which the system will execute the program, and returns it as a String
     * using a JSON representation of the execution data flow graph.
     *
     * @return The execution plan of the program, as a JSON String.
     * @throws Exception Thrown, if the compiler could not be instantiated.
     */
    public String getExecutionPlan() throws Exception {
        Plan p = createProgramPlan(getJobName(), false);
        return ExecutionPlanUtil.getExecutionPlanAsJSON(p);
    }

    /**
     * Registers a file at the distributed cache under the given name. The file will be accessible
     * from any user-defined function in the (distributed) runtime under a local path. Files may be
     * local files (which will be distributed via BlobServer), or files in a distributed file
     * system. The runtime will copy the files temporarily to a local cache, if needed.
     *
     * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside
     * UDFs via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
     * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via {@link
     * org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
     *
     * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
     *     "hdfs://host:port/and/path")
     * @param name The name under which the file is registered.
     */
    public void registerCachedFile(String filePath, String name) {
        registerCachedFile(filePath, name, false);
    }

    /**
     * Registers a file at the distributed cache under the given name. The file will be accessible
     * from any user-defined function in the (distributed) runtime under a local path. Files may be
     * local files (which will be distributed via BlobServer), or files in a distributed file
     * system. The runtime will copy the files temporarily to a local cache, if needed.
     *
     * <p>The {@link org.apache.flink.api.common.functions.RuntimeContext} can be obtained inside
     * UDFs via {@link org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
     * provides access {@link org.apache.flink.api.common.cache.DistributedCache} via {@link
     * org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
     *
     * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
     *     "hdfs://host:port/and/path")
     * @param name The name under which the file is registered.
     * @param executable flag indicating whether the file should be executable
     */
    public void registerCachedFile(String filePath, String name, boolean executable) {
        this.cacheFile.add(new Tuple2<>(name, new DistributedCacheEntry(filePath, executable)));
    }

    /**
     * Creates the program's {@link Plan}. The plan is a description of all data sources, data
     * sinks, and operations and how they interact, as an isolated unit that can be executed with an
     * {@link PipelineExecutor}. Obtaining a plan and starting it with an executor is an alternative
     * way to run a program and is only possible if the program consists only of distributed
     * operations. This automatically starts a new stage of execution.
     *
     * @return The program's plan.
     */
    @Internal
    public Plan createProgramPlan() {
        return createProgramPlan(getJobName());
    }

    /**
     * Creates the program's {@link Plan}. The plan is a description of all data sources, data
     * sinks, and operations and how they interact, as an isolated unit that can be executed with an
     * {@link PipelineExecutor}. Obtaining a plan and starting it with an executor is an alternative
     * way to run a program and is only possible if the program consists only of distributed
     * operations. This automatically starts a new stage of execution.
     *
     * @param jobName The name attached to the plan (displayed in logs and monitoring).
     * @return The program's plan.
     */
    @Internal
    public Plan createProgramPlan(String jobName) {
        return createProgramPlan(jobName, true);
    }

    /**
     * Creates the program's {@link Plan}. The plan is a description of all data sources, data
     * sinks, and operations and how they interact, as an isolated unit that can be executed with an
     * {@link PipelineExecutor}. Obtaining a plan and starting it with an executor is an alternative
     * way to run a program and is only possible if the program consists only of distributed
     * operations.
     *
     * @param jobName The name attached to the plan (displayed in logs and monitoring).
     * @param clearSinks Whether or not to start a new stage of execution.
     * @return The program's plan.
     */
    @Internal
    public Plan createProgramPlan(String jobName, boolean clearSinks) {
        checkNotNull(jobName);

        if (this.sinks.isEmpty()) {
            if (wasExecuted) {
                throw new RuntimeException(
                        "No new data sinks have been defined since the "
                                + "last execution. The last execution refers to the latest call to "
                                + "'execute()', 'count()', 'collect()', or 'print()'.");
            } else {
                throw new RuntimeException(
                        "No data sinks have been created yet. "
                                + "A program needs at least one sink that consumes data. "
                                + "Examples are writing the data set or printing it.");
            }
        }

        final PlanGenerator generator =
                new PlanGenerator(sinks, config, getParallelism(), cacheFile, jobName);
        final Plan plan = generator.generate();

        // clear all the sinks such that the next execution does not redo everything
        if (clearSinks) {
            this.sinks.clear();
            wasExecuted = true;
        }

        return plan;
    }

    /**
     * Adds the given sink to this environment. Only sinks that have been added will be executed
     * once the {@link #execute()} or {@link #execute(String)} method is called.
     *
     * @param sink The sink to add for execution.
     */
    @Internal
    void registerDataSink(DataSink<?> sink) {
        this.sinks.add(sink);
    }

    /**
     * Gets the job name. If user defined job name is not found in the configuration, the default
     * name based on the timestamp when this method is invoked will return.
     *
     * @return A job name.
     */
    private String getJobName() {
        return configuration.getString(
                PipelineOptions.NAME, "Flink Java Job at " + Calendar.getInstance().getTime());
    }

    // --------------------------------------------------------------------------------------------
    //  Instantiation of Execution Contexts
    // --------------------------------------------------------------------------------------------

    /**
     * Creates an execution environment that represents the context in which the program is
     * currently executed. If the program is invoked standalone, this method returns a local
     * execution environment, as returned by {@link #createLocalEnvironment()}. If the program is
     * invoked from within the command line client to be submitted to a cluster, this method returns
     * the execution environment of this cluster.
     *
     * @return The execution environment of the context in which the program is executed.
     */
    public static ExecutionEnvironment getExecutionEnvironment() {
        return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
                .map(ExecutionEnvironmentFactory::createExecutionEnvironment)
                .orElseGet(ExecutionEnvironment::createLocalEnvironment);
    }

    /**
     * Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will
     * execute in a single thread in the current JVM. It is very fast but will fail if the data does
     * not fit into memory. parallelism will always be 1. This is useful during implementation and
     * for debugging.
     *
     * @return A Collection Environment
     */
    @PublicEvolving
    public static CollectionEnvironment createCollectionsEnvironment() {
        CollectionEnvironment ce = new CollectionEnvironment();
        ce.setParallelism(1);
        return ce;
    }

    /**
     * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
     * multi-threaded fashion in the same JVM as the environment was created in. The default
     * parallelism of the local environment is the number of hardware contexts (CPU cores /
     * threads), unless it was specified differently by {@link #setDefaultLocalParallelism(int)}.
     *
     * @return A local execution environment.
     */
    public static LocalEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalDop);
    }

    /**
     * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
     * multi-threaded fashion in the same JVM as the environment was created in. It will use the
     * parallelism specified in the parameter.
     *
     * @param parallelism The parallelism for the local environment.
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalEnvironment createLocalEnvironment(int parallelism) {
        return createLocalEnvironment(new Configuration(), parallelism);
    }

    /**
     * Creates a {@link LocalEnvironment}. The local execution environment will run the program in a
     * multi-threaded fashion in the same JVM as the environment was created in. It will use the
     * parallelism specified in the parameter.
     *
     * @param customConfiguration Pass a custom configuration to the LocalEnvironment.
     * @return A local execution environment with the specified parallelism.
     */
    public static LocalEnvironment createLocalEnvironment(Configuration customConfiguration) {
        return createLocalEnvironment(customConfiguration, -1);
    }

    /**
     * Creates a {@link LocalEnvironment} for local program execution that also starts the web
     * monitoring UI.
     *
     * <p>The local execution environment will run the program in a multi-threaded fashion in the
     * same JVM as the environment was created in. It will use the parallelism specified in the
     * parameter.
     *
     * <p>If the configuration key 'rest.port' was set in the configuration, that particular port
     * will be used for the web UI. Otherwise, the default port (8081) will be used.
     */
    @PublicEvolving
    public static ExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration conf) {
        checkNotNull(conf, "conf");

        if (!conf.contains(RestOptions.PORT)) {
            // explicitly set this option so that it's not set to 0 later
            conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
        }

        return createLocalEnvironment(conf, -1);
    }

    /**
     * Creates a {@link LocalEnvironment} which is used for executing Flink jobs.
     *
     * @param configuration to start the {@link LocalEnvironment} with
     * @param defaultParallelism to initialize the {@link LocalEnvironment} with
     * @return {@link LocalEnvironment}
     */
    private static LocalEnvironment createLocalEnvironment(
            Configuration configuration, int defaultParallelism) {
        final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);

        if (defaultParallelism > 0) {
            localEnvironment.setParallelism(defaultParallelism);
        }

        return localEnvironment;
    }

    /**
     * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program to a
     * cluster for execution. Note that all file paths used in the program must be accessible from
     * the cluster. The execution will use the cluster's default parallelism, unless the parallelism
     * is set explicitly via {@link ExecutionEnvironment#setParallelism(int)}.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static ExecutionEnvironment createRemoteEnvironment(
            String host, int port, String... jarFiles) {
        return new RemoteEnvironment(host, port, jarFiles);
    }

    /**
     * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program to a
     * cluster for execution. Note that all file paths used in the program must be accessible from
     * the cluster. The custom configuration file is used to configure Akka specific configuration
     * parameters for the Client only; Program parallelism can be set via {@link
     * ExecutionEnvironment#setParallelism(int)}.
     *
     * <p>Cluster configuration has to be done in the remotely running Flink instance.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param clientConfiguration Configuration used by the client that connects to the cluster.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static ExecutionEnvironment createRemoteEnvironment(
            String host, int port, Configuration clientConfiguration, String... jarFiles) {
        return new RemoteEnvironment(host, port, clientConfiguration, jarFiles, null);
    }

    /**
     * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program to a
     * cluster for execution. Note that all file paths used in the program must be accessible from
     * the cluster. The execution will use the specified parallelism.
     *
     * @param host The host name or address of the master (JobManager), where the program should be
     *     executed.
     * @param port The port of the master (JobManager), where the program should be executed.
     * @param parallelism The parallelism to use during the execution.
     * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the
     *     program uses user-defined functions, user-defined input formats, or any libraries, those
     *     must be provided in the JAR files.
     * @return A remote environment that executes the program on a cluster.
     */
    public static ExecutionEnvironment createRemoteEnvironment(
            String host, int port, int parallelism, String... jarFiles) {
        RemoteEnvironment rec = new RemoteEnvironment(host, port, jarFiles);
        rec.setParallelism(parallelism);
        return rec;
    }

    // --------------------------------------------------------------------------------------------
    //  Default parallelism for local execution
    // --------------------------------------------------------------------------------------------

    /**
     * Gets the default parallelism that will be used for the local execution environment created by
     * {@link #createLocalEnvironment()}.
     *
     * @return The default local parallelism
     */
    public static int getDefaultLocalParallelism() {
        return defaultLocalDop;
    }

    /**
     * Sets the default parallelism that will be used for the local execution environment created by
     * {@link #createLocalEnvironment()}.
     *
     * @param parallelism The parallelism to use as the default local parallelism.
     */
    public static void setDefaultLocalParallelism(int parallelism) {
        defaultLocalDop = parallelism;
    }

    // --------------------------------------------------------------------------------------------
    //  Methods to control the context environment and creation of explicit environments other
    //  than the context environment
    // --------------------------------------------------------------------------------------------

    /**
     * Sets a context environment factory, that creates the context environment for running programs
     * with pre-configured environments. Examples are running programs from the command line.
     *
     * <p>When the context environment factory is set, no other environments can be explicitly used.
     *
     * @param ctx The context environment factory.
     */
    protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = Preconditions.checkNotNull(ctx);
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    }

    /**
     * Un-sets the context environment factory. After this method is called, the call to {@link
     * #getExecutionEnvironment()} will again return a default local execution environment, and it
     * is possible to explicitly instantiate the LocalEnvironment and the RemoteEnvironment.
     */
    protected static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
        threadLocalContextEnvironmentFactory.remove();
    }

    /**
     * Checks whether it is currently permitted to explicitly instantiate a LocalEnvironment or a
     * RemoteEnvironment.
     *
     * @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
     *     RemoteEnvironment, false otherwise.
     */
    @Internal
    public static boolean areExplicitEnvironmentsAllowed() {
        return contextEnvironmentFactory == null
                && threadLocalContextEnvironmentFactory.get() == null;
    }
}
